package com.ruyuan.little.project.rocketmq.api.coupon.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.little.project.common.dto.CommonResponse;
import com.ruyuan.little.project.common.enums.ErrorCodeEnum;
import com.ruyuan.little.project.common.enums.LittleProjectTypeEnum;
import com.ruyuan.little.project.redis.api.RedisApi;
import com.ruyuan.little.project.rocketmq.api.coupon.dto.FirstLoginMessageDTO;
import com.ruyuan.little.project.rocketmq.api.coupon.service.CouponService;
import com.ruyuan.little.project.rocketmq.common.constants.RedisKeyConstant;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;

/**
 * 第一次登录成功的Listener
 */
@Component
public class FirstLoginMessageListener implements MessageListenerConcurrently {

    private static final Logger LOGGER = LoggerFactory.getLogger(FirstLoginMessageListener.class);

    @Resource
    private CouponService couponService;

    /**
     * redis dubbo服务
     */
    @Reference(version = "1.0.0",
            interfaceClass = RedisApi.class,
            cluster = "failfast",check = false)
    private RedisApi redisApi;

    /**
     * 第一次登陆下发的优惠券id
     */
    @Value("${first.login.couponId}")
    private Integer firstLoginCouponId;

    /**
     * 第一次登陆优惠券有效天数
     */
    @Value("${first.login.coupon.day}")
    private Integer firstLoginCouponDay;

    /**
     * 消费 第一登录消息 发送用户优惠券
     * 幂等
     * 对于消息中间件来说，消费者处理消息是必须要做好幂等的，因为有各种可能会导致消息重复消费，比如网络抖动，消费者ACK没有收到
     * 那这里采用的是redis + setnx
     * @param msgs
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        Integer userId = null;
        String phoneNumber = null;

        for (MessageExt msg : msgs) {
            String body = new String(msg.getBody(), StandardCharsets.UTF_8);

            try {
                LOGGER.info("received login success message:{}", body);

                FirstLoginMessageDTO firstLoginMessageDTO = JSON.parseObject(body, FirstLoginMessageDTO.class);
                userId = firstLoginMessageDTO.getUserId();
                phoneNumber = firstLoginMessageDTO.getPhoneNumber();

                CommonResponse<Boolean> response = redisApi.setnx(RedisKeyConstant.FIRST_LOGIN_DUPLICATION_KEY_PREFIX + userId, String.valueOf(userId), phoneNumber, LittleProjectTypeEnum.ROCKETMQ);

                if (Objects.equals(response.getCode(), ErrorCodeEnum.FAIL.getCode())) {
                    // 请求redis dubbo接口失败
                    LOGGER.info("consumer first login message redis dubbo interface fail userId:{}", userId);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

                if (Objects.equals(response.getCode(), ErrorCodeEnum.SUCCESS.getCode()) && Objects.equals(response.getData(), Boolean.FALSE)) {
                    // 重复消费
                    LOGGER.info("duplicate consumer first login message userId:{}", userId);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                } else {
                    couponService.distributeCoupon(firstLoginMessageDTO.getBeid(),
                            firstLoginMessageDTO.getUserId(), firstLoginCouponId, firstLoginCouponDay, 0, phoneNumber);

                    LOGGER.info("distribute userId:{} first login coupon end", userId);
                }
            } catch (Exception e) {
                // 异常处理
                if (null != userId){
                    redisApi.del(RedisKeyConstant.FIRST_LOGIN_DUPLICATION_KEY_PREFIX + userId,phoneNumber,LittleProjectTypeEnum.ROCKETMQ);
                }
                // 消费失败
                LOGGER.info("received login success message:{}, consumer fail", body);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
